package org.example.launch

import com.alibaba.fastjson.{JSON, JSONObject}
import org.example.bean.CarGPS
import org.example.common.{Logging, Sparking}
import org.example.constant.ApolloConst
import org.example.dao.DrivingPeriod
import org.example.service.HiddenPeriodProcess
import org.example.service.HiddenRouteProcess.{processCharterRoute, processPassRoute, processPermitRoute}
import org.example.service.ProduceHidden.processHidden
import org.example.utils.MapUtils.{getArea, getVehicleIsInArea}
import org.example.utils._
import io.searchbox.client.JestClient
import io.searchbox.core.Search
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import redis.clients.jedis.Jedis
import java.awt.geom.Point2D
import java.{lang, util}

import org.apache.kafka.common.serialization.StringDeserializer

import scala.beans.BeanProperty
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.Breaks.breakable

/**
 * @Description 实时隐患生成，发送到kafka topic: transport_supervise_topic_ys
 *              zcov.supervise_process_info
 *              需要StaticToRedis配合
 * @date 2022/5/5 17:44
 */
object HiddenPerilModel extends Sparking with Logging{
  @volatile private var instance: Broadcast[Map[(String, String), (String, String,String)]] = null
  @volatile private var electronicFence: Broadcast[Map[String, ElectronicFenceIndex]] = null
  @volatile private var hiddenRule: Broadcast[Map[String, Boolean]] = null


  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder().config(conf)
      .config("hive.metastore.uris", ApolloConst.hiveMetastore)
      .enableHiveSupport()
      .getOrCreate()


    val ssc = new StreamingContext(session.sparkContext, Seconds(30))

    val groupId = "hiddenPerilModel3.2"
    val topic = Array(ApolloConst.hiddenPerilModelTopic)
    //val kafkaParams = getKafkaParams(ApolloConst.bootstrap, groupId)
    val kafkaParams=Map[String, Object](
      "bootstrap.servers" -> ApolloConst.bootstrap,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "reconnect.backoff.ms" -> "0")

    val offsets = OffsetManager.apply(groupId, topic)


    val inputStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topic, kafkaParams, offsets))

    var offsetRanges = Array.empty[OffsetRange]

    val inputStream1: DStream[ConsumerRecord[String, String]] = inputStream.transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd
    }

    //广播redis连接池
    val redisSink: RedisSink = RedisSink()
    //更新redis中人、车、企基本信息（已录入数据）
    redisSink.usingRedis(redis => {
      redis.select(9)
      SearchInfo.updateBaseInfoForHidden(redis)
    })



    //val vehicleEnterpriseMap: Map[(String, String), (String, String)] = getVehicleEnterpriseMap(session)
    //车辆对应的企业编码和企业名称
    val vehicleBroadcast: Broadcast[Map[(String, String), (String, String,String)]] = getInstance(session)
    val electronicFenceBroadcast: Broadcast[Map[String, ElectronicFenceIndex]] = getElectronicFence(session)
    val hiddenRuleBroadcast: Broadcast[Map[String, Boolean]] = getHiddenRule(session)


    val carGpsDStream: DStream[CarGPS] = inputStream1.mapPartitions { iter =>
      val listBuffer: ListBuffer[CarGPS] = new ListBuffer[CarGPS]()
      val vehicleColorMap = Map[String, String]("1" -> "蓝色", "2" -> "黄色", "3" -> "黑色", "4" -> "白色", "9" -> "其他") //车牌颜色map

      val vehicleEnterpriseMap: Map[(String, String), (String, String,String)] = vehicleBroadcast.value


      if (null != iter && iter.nonEmpty) {
        val array: Array[ConsumerRecord[String, String]] = iter.toArray
        for (elem <- array) {
          val jsonStr: String = elem.value()
          if (jsonStr.startsWith("{") && jsonStr.endsWith("}")) {
            val jSONObject: JSONObject = JSON.parseObject(jsonStr)
            if (null != jSONObject.getString("dataType") && jSONObject.getString("dataType").equals("0x1202")) {
              val carGPS: CarGPS = JSON.parseObject(jSONObject.toJSONString, classOf[CarGPS])
              carGPS.vehicleColor = vehicleColorMap.getOrElse(carGPS.vehicleColor, "unknown")
              val enterpriseCodeAndNameAndSocial: (String, String,String) = vehicleEnterpriseMap.getOrElse((carGPS.vehicleNo, carGPS.vehicleColor), ("未知企业编码", "未知企业","未知社会信用代码"))
              carGPS.dealDeptCode = enterpriseCodeAndNameAndSocial._1 //处理部门编码=>已经变为生成企业编码
              carGPS.enterpriseName = enterpriseCodeAndNameAndSocial._2 //处理部门名称
              carGPS.socialCreditCode=enterpriseCodeAndNameAndSocial._3  //社会信用代码
              carGPS.dateTime = DateTime.parse(carGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd HH:mm:ss")
              carGPS.lat = (carGPS.lat.toDouble / 1000000).toString
              carGPS.lon = (carGPS.lon.toDouble / 1000000).toString
              //              val mars = Tools.wgs84togcj02(carGPS.lon.toDouble, carGPS.lat.toDouble)
              //              carGPS.lon = mars._1.toString
              //              carGPS.lat = mars._2.toString
              listBuffer.append(carGPS)
            }
          }
        }
      }
      listBuffer.toIterator
    }.filter { data => data.vehicleColor != "unknown" && data.dealDeptCode != "未知企业编码" && data.enterpriseName != "未知企业" } //过滤掉车牌颜色为unknown的车

    carGpsDStream.foreachRDD { rdd =>
      val nowTime = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
      val electronicFenceBroadcast: Broadcast[Map[String, ElectronicFenceIndex]] = getElectronicFence(session)
      val hiddenRuleBroadcast: Broadcast[Map[String, Boolean]] = getHiddenRule(session)
      if (nowTime.substring(11,16)=="05:00"){
        //每日需要更新一次的广播变量
        updateBroadCastVarDaily(session,true)
      }else{
        //每个批次需要更新的广播变量
        updateBroadCastVar(session,true)

      }
      if (!rdd.isEmpty()) {
        val groupedRDD: RDD[((String, String), Iterable[CarGPS])] = rdd.map { data => ((data.vehicleNo, data.vehicleColor), data) }.groupByKey()

        groupedRDD.foreachPartition { iter =>
          val kafkaProducer: KafkaProducer[String, String] = KafkaUtil.getKafkaProducer(ApolloConst.bootstrap)
          val sink: RedisSink = RedisSink() //获取redis客户端
          val sink2: RedisSink = RedisSink2() //获取redis客户端

          sink.usingRedis { jedis =>
            jedis.select(9)
            if (null != iter && iter.nonEmpty) {
              val keyedGpsArr: Array[((String, String), Iterable[CarGPS])] = iter.toArray

              for (elem <- keyedGpsArr) {

                var useNature=""
                var controlType=""

                val plateNo: String = elem._1._1 //车牌号码
                val plateColor: String = elem._1._2 // 车牌颜色
                val gpsIter: Iterable[CarGPS] = elem._2
                sink2.usingRedis { jedis2 =>
                  jedis2.select(3)
                  val vehicleColorMap2 = Map[String, String]("蓝色" -> "1", "黄色" -> "2", "黑色" -> "3", "白色" -> "4","绿色" -> "5", "其他" -> "9") //车牌颜色map
                  val vehicleColor=vehicleColorMap2.getOrElse(plateColor, "unknown")
                  useNature= jedis2.hget("useNature:"+plateNo+"_"+vehicleColor, "useNature")
                  controlType=jedis2.hget("useNature:"+plateNo+"_"+vehicleColor, "controlType")
                }
                if (useNature ==null || useNature ==""){
                  useNature=jedis.hget("useNature:"+plateNo+"_"+plateColor, "useNature")
                  controlType=jedis.hget("useNature:"+plateNo+"_"+plateColor, "controlType")
                }

                if (useNature ==null || useNature ==""){
                  useNature="-1"
                }
                if (controlType ==null || controlType ==""){
                  controlType="-1"
                }
                var industry_type_code: String = "-1"
                var industry_name: String = "-1"

                if (controlType!="-1"&&controlType!=null&&controlType!=""){
                  industry_type_code=jedis.hget("useNatureToIndustry", controlType)
                  industry_name=jedis.hget("useNatureToIndustryName",controlType)
                }else if (useNature!="-1"&&useNature!=null&&useNature!=""){
                  industry_type_code=jedis.hget("useNatureToIndustry", useNature)
                  industry_name=jedis.hget("useNatureToIndustryName",useNature)


                }


                if (industry_type_code==null||industry_type_code==""){
                  industry_type_code="10011004"
                  industry_name="-1"
                }


                val electronicFenceMap: Map[String, ElectronicFenceIndex] = electronicFenceBroadcast.value
                val hiddenRuleMap: Map[String, Boolean] = hiddenRuleBroadcast.value


                //超速高发（一路狂奔）(yh_veh_007)/超速（屡教不改）(yh_veh_008)
                //杭州为 60s 为一个间隔，连续 5 次
                //贵阳为 75s 为一个间隔，连续 3 次
                //processOverSpeedContinous(gpsIter, jedis, kafkaProducer, 3, industry_type_code, industry_type_code,industry_name)
                //道路运输证 :过期(yh_veh_013) 、无道路运输证(yh_veh_014)
                processTransportCertificate(gpsIter, jedis, kafkaProducer, industry_type_code, industry_type_code,industry_name,useNature,controlType,electronicFenceMap,hiddenRuleMap)

              }
            }
          }
        }
      }
      OffsetManager.saveCurrentBatchOffset(groupId, offsetRanges, ssc)
    }


    ssc.start()
    ssc.awaitTermination()
  }

  /**
   *  获取广播变量单例对象
   *
   * @param session
   */
  def getInstance(session: SparkSession): Broadcast[Map[(String, String), (String, String,String)]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = session.sparkContext.broadcast(getVehicleEnterpriseMap(session))
        }
      }
    }
    instance
  }
  /**
   *  获取广播变量单例对象
   *
   * @param session
   */
  def getElectronicFence(session: SparkSession): Broadcast[Map[String, ElectronicFenceIndex]] = {
    if (electronicFence == null) {
      synchronized {
        if (electronicFence == null) {
          electronicFence = session.sparkContext.broadcast(getElectronicFenceMap(session))
        }
      }
    }
    electronicFence
  }

  /**
   *  获取广播变量单例对象
   *
   * @param session
   */
  def getHiddenRule(session: SparkSession): Broadcast[Map[String, Boolean]] = {
    if (hiddenRule == null) {
      synchronized {
        if (hiddenRule == null) {
          hiddenRule = session.sparkContext.broadcast(getHiddenRuleMap(session))
        }
      }
    }
    hiddenRule
  }


  /**
   *  每日动态更新广播变量
   *
   * @param session
   * @param blocking
   */
  def updateBroadCastVarDaily(session: SparkSession, blocking: Boolean = false): Unit = {
    if (instance != null) {
      //删除缓存在executors上的广播副本，并可选择是否在删除完成后进行block等待
      //底层可选择是否将driver端的广播副本也删除
      instance.unpersist(blocking)

      instance = session.sparkContext.broadcast(getVehicleEnterpriseMap(session))
    }

    if (electronicFence != null) {
      //删除缓存在executors上的广播副本，并可选择是否在删除完成后进行block等待
      //底层可选择是否将driver端的广播副本也删除
      electronicFence.unpersist(blocking)

      electronicFence = session.sparkContext.broadcast(getElectronicFenceMap(session))
    }

  }

  /**
   *  动态更新广播变量
   *
   * @param session
   * @param blocking
   */
  def updateBroadCastVar(session: SparkSession, blocking: Boolean = false): Unit = {

    if (hiddenRule != null) {
      logger.info("hiddenRule")
      //删除缓存在executors上的广播副本，并可选择是否在删除完成后进行block等待
      //底层可选择是否将driver端的广播副本也删除
      hiddenRule.unpersist(blocking)

      hiddenRule = session.sparkContext.broadcast(getHiddenRuleMap(session))
    }
  }




  /**
   * 获取车辆对应的企业编码和企业名称 Map[(plateNo, plateColor) -> (enterpriseCode, enterpriseName)]
   *
   * @param session
   * @return
   */
  def getVehicleEnterpriseMap(session: SparkSession): Map[(String, String), (String, String,String)] = {

    import session.implicits._
    session.sql(
      """
        |SELECT
        | t1.vehicle_number plate_num,
        |	t1.vehicle_plate_color plate_color,
        |	t1.business_owner_id enterprise_code,
        |	t2.business_owner_name enterprise_name,
        | t3.social_credit_code
        |
        |FROM
        |  (
        |  select
        |   vehicle_id,
        |   vehicle_number,
        |   vehicle_plate_color,
        |   business_owner_id,
        |   operation_state
        |  FROM
        |     (SELECT
        |        *,
        |        ROW_NUMBER() OVER(PARTITION BY vehicle_number,vehicle_plate_color ORDER BY file_create_date DESC) rk
        |      FROM dwd.dwd_yz_vehicle_info
        |      where TRIM(vehicle_number) !=''
        |      and TRIM(vehicle_plate_color) !=''
        |      and TRIM(business_owner_id) != ''
        |      and business_owner_id is not null
        |      and trim(vehicle_id) != ''
        |      and vehicle_id is not null )t
        |   where t.rk = 1
        |  )t1
        |left JOIN
        |	dwd.dwd_yz_company_info t2
        | ON t1.business_owner_id = t2.business_owner_id
        | left join
        | (SELECT
        |      business_owner_id,
        |      business_owner_economic_category,
        |      case WHEN trim(business_registration_number) != '' and business_registration_number is not NULL then trim(business_registration_number)
        |           WHEN (trim(business_registration_number) = '' or business_registration_number is NULL ) and (trim(organization_code_card) != '' and organization_code_card is not NULL) then trim(organization_code_card)
        |           WHEN (trim(business_registration_number) = '' or business_registration_number is NULL)  and (trim(organization_code_card) = '' or organization_code_card is NULL ) and (trim(organization_code) != '' and organization_code is not NULL) then trim(organization_code)
        |           END social_credit_code, --统一社会信用代码
        |	     legal_representative_number_of_id_certificate, -- 运政工商信息-法人身份证
        |	     business_registration_number, -- 运政工商信息-工商注册号
        |	     organization_code_card, -- 运政工商信息-组织机构代码证
        |	     organization_code -- 运政工商信息-组织机构代码
        |   from
        |      dwd.dwd_yz_business_info
        |         where trim(business_owner_id) != ''
        |   and business_owner_id is not null
        |   and (LENGTH(TRIM(organization_code)) >1
        |   or LENGTH(TRIM(organization_code_card)) >1
        |   or LENGTH(TRIM(business_registration_number)) >1)
        | )t3
        | on t1.business_owner_id = t3.business_owner_id
        |""".stripMargin).map { row =>
      val plateNo: String = row.getAs[String]("plate_num")
      val plateColor: String = row.getAs[String]("plate_color")
      val enterpriseCode: String = row.getAs[String]("enterprise_code")
      val enterpriseName: String = row.getAs[String]("enterprise_name")
      val social_credit_code: String = row.getAs[String]("social_credit_code")


      (plateNo, plateColor) -> (enterpriseCode, enterpriseName,social_credit_code)
    }.collect().toMap
  }

  /**
   * 获取车辆对应的企业编码和企业名称 Map[(plateNo, plateColor) -> (enterpriseCode, enterpriseName)]
   *
   * @param session
   * @return
   */
  def getElectronicFenceMap(session: SparkSession): Map[String, ElectronicFenceIndex] = {
    val client: JestClient = EsUtils.getClient()
    val query: String =
      s"""
         |{
         |  "size":10000,
         |  "query": {
         |    "match_all": {}
         |  }
         |}
         |""".stripMargin
    val search: Search = new Search.Builder(query).addIndex("electronic_fence_index").addType("electronic_fence_type").build()
    var electronicFenceList = client.execute(search).getSourceAsObjectList(classOf[ElectronicFenceIndex],false).asScala

    val tuples: List[(String, ElectronicFenceIndex)]=electronicFenceList.toList.map { electronicFence => {
      (electronicFence.name, electronicFence)
    }
    }
    tuples.toMap
  }

  /**
   * 获取隐患对应规则
   *
   * @param session
   * @return
   */
  def getHiddenRuleMap(session: SparkSession): Map[String, Boolean] = {

    var map = mutable.Map[String,Boolean]()
    val sql =
      """
        |select id,enable_state from zcov.supervise_config where deleted=0
        |""".stripMargin

    val rowItr: util.Iterator[GenericRowWithSchema] = MysqlUtil.getMysqlQueryRow(MysqlUtil.getMysqlQueryResult(sql))
    while (rowItr.hasNext) {
      val row = rowItr.next()
      val id: String = row.getAs[Int]("id").toString
      val enable_state: Boolean = row.getAs[Boolean]("enable_state")
      map = map+(id -> enable_state)
    }
    map.toMap
  }




  /**
   * 处理道路运输证的业务（道路运输证过期、无道路运输证）
   *
   * @param carGPS
   * @param jedis
   * @param kafkaProducer
   * @param industry 所属行业
   * @param typeCode
   */
  def processTransportCertificate(carGPS: Iterable[CarGPS], jedis: Jedis, kafkaProducer: KafkaProducer[String, String], industry: String, typeCode: String,industry_name:String,
                                  useNature:String,controlType:String,electronicFenceMap: Map[String, ElectronicFenceIndex],hiddenRuleMap: Map[String, Boolean]): Unit = {


    val continousTimes=3

    val carGPSArr: Array[CarGPS] = carGPS.toArray

    val sortedCarGPSArr: Array[CarGPS] = carGPSArr.sortBy(_.dateTime)

    if (null != sortedCarGPSArr && sortedCarGPSArr.nonEmpty) {
      breakable {
        for (carGPS <- sortedCarGPSArr) {
          logger.info(carGPS.vehicleNo)

          // 行驶周期的计算
          val periodStatus = HiddenPeriodProcess.drivingPeriod(carGPS, jedis)
          val isNewPeriod = periodStatus._1
          logger.info(periodStatus._1+":"+periodStatus._2+":"+periodStatus._3+":"+periodStatus._4)
          val primaryKey = carGPS.vehicleNo + "#" + carGPS.vehicleColor
          val drivingPeriodStr: String = jedis.hget("hiddenDrivingPeriod", primaryKey)
          logger.info("当前车辆所处的行驶周期"+drivingPeriodStr)

          val drivingPeriod = JSON.parseObject(drivingPeriodStr, classOf[DrivingPeriod])
          //行程开始时间
          val tripStartTime: String = drivingPeriod.ts

          logger.info("进入证件隐患判断")
          val dateTime: String = DateTime.parse(carGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd") //事件发生时间
          val dateTimeStr: String = DateTime.parse(carGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("HH:mm:ss")

          val monthDateTime:String = DateTime.parse(carGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM")
          val vehicleInfo: util.Map[String, String] = jedis.hgetAll("transportCertificateDate:" + carGPS.vehicleNo + "_" + carGPS.vehicleColor)

          val operation_state = vehicleInfo.get("operation_state")
          val business_scope = vehicleInfo.get("business_scope")


          val road_transportation_certificate_classification_number = vehicleInfo.get("road_transportation_certificate_classification_number")
          val insure_vehicle_id = vehicleInfo.get("insure_vehicle_id")
          val branum = vehicleInfo.get("branum")
          val bracolor = vehicleInfo.get("bracolor")
          val start_point_name = vehicleInfo.get("start_point_name")
          val aim_name = vehicleInfo.get("aim_name")
          val alignments = vehicleInfo.get("alignments")
          val is_return_passenger = vehicleInfo.get("is_return_passenger")



          //班线
          val tranship_plate_num = vehicleInfo.get("tranship_plate_num")
          val line_vehicle_id = vehicleInfo.get("line_vehicle_id")


          //包车
          val grade_charter_mark = vehicleInfo.get("grade_charter_mark")
          val charter_start_date = vehicleInfo.get("charter_start_date")
          val charter_end_date = vehicleInfo.get("charter_end_date")
          val charter_range = vehicleInfo.get("charter_range")

          //通行证
          val pass_plate_num=vehicleInfo.get("pass_plate_num")
          //通行时间-间隔天数  例如：0-1-14:00-15:00,0-1-16:00-17:00,0-1  0 每日，1 周一到周五，2 周六周日，3 工作日，4 节假日  0 全天，1 定时
          val pass_pass_time_interval=vehicleInfo.get("pass_pass_time_interval")    //0-1-14:00-15:00|?
          val pass_pass_route_location=vehicleInfo.get("pass_pass_route_location")  //10,20;20,30;40,50|?
          val pass_gmt_end=vehicleInfo.get("pass_gmt_end")

          //准运证
          val permit_plate_num=vehicleInfo.get("permit_plate_num")
          val permit_license_no=vehicleInfo.get("permit_license_no")
          //通行时间-间隔天数  例如：0-1-14:00-15:00,0-1-16:00-17:00,0-1  0 每日，1 周一到周五，2 周六周日，3 工作日，4 节假日  0 全天，1 定时
          val pass_time_interval=vehicleInfo.get("pass_time_interval")    //0-1-14:00-15:00|?
          val pass_route_location=vehicleInfo.get("pass_route_location")  //10,20;20,30;40,50|?
          val gmt_end=vehicleInfo.get("gmt_end")

          var nowDateTime = "00:00"
          if (carGPS.dateTime.length>16){
            nowDateTime=carGPS.dateTime.substring(11,16)
          }



          val expireDate = vehicleInfo.get("expire_date")
          var annualDate = vehicleInfo.get("annual_date")
          if (annualDate!=null&&annualDate!="null"&&annualDate.length>8){
            annualDate=annualDate.substring(0,7)
          }
          val nextApprovedDate = vehicleInfo.get("next_approved_date")
          val certificate_date = vehicleInfo.get("certificate_date")
          val carrier_liability_insurance_effective_date = vehicleInfo.get("carrier_liability_insurance_effective_date")


          val electronicFenceName = vehicleInfo.get("electronicFenceName")



          // 隐患默认发往哪个topic
          var transport_supervise_topic="transport_supervise_topic_ys"
          var hiddenName="未知隐患"
          var superviseConfigId=""
          var superviseRuleConfigId=""
          var content=""
          var keyOfRedis=""
          var yesTodayKeyOfRedis=""
          var generateObject=carGPS.vehicleNo
          var generateObjectCode=carGPS.vehicleNo

          val todayStr: String = new DateTime().toString("yyyy-MM-dd")
          val yesterdayStr: String = DateTime.parse(todayStr, DateTimeFormat.forPattern("yyyy-MM-dd")).plusDays(-1).toString("yyyy-MM-dd")

          if (operation_state=="营运") {
            if (isNewPeriod && (industry_name == "普货" || industry_name == "危货" || industry_name == "客运")) { //普货、危货、客运
              if (road_transportation_certificate_classification_number == "null" || road_transportation_certificate_classification_number == "") { //从redis中获取道路运输证号，如果为null，则表示无道路运输证
                hiddenName = "无道路运输证"
                superviseConfigId = "14"
                superviseRuleConfigId = "14"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportCertificate_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportCertificate_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到道路运输证信息"
                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              } else if (expireDate != "" && dateTime > expireDate) { //营运时间大于过期时间且 过期时间不为空串（空串暂不判断）
                //todo 注意隐患生成时间存在异常数据
                hiddenName = "道路运输证过期"
                superviseConfigId = "13"
                superviseRuleConfigId = "13"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + expireDate
                keyOfRedis = "transportCertificateExpire_" + todayStr
                yesTodayKeyOfRedis = "transportCertificateExpire_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且道路运输证已在" + expireDate + "过期"

                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }
            }
            if (isNewPeriod && (industry_name == "普货" || industry_name == "危货" || industry_name == "客运")) { //普货、危货、客运
              if (annualDate == "null") {
                //车辆基础信息表匹配不到年审数据
              } else if (annualDate != "" && monthDateTime > annualDate) {
                hiddenName = "道路运输证逾期未年审"
                superviseConfigId = "21"
                superviseRuleConfigId = "21"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + annualDate
                keyOfRedis = "transportCertificateAnnual_" + todayStr
                yesTodayKeyOfRedis = "transportCertificateAnnual_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且道路运输证已在" + annualDate + "逾期未年审"
                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }
            }
            if (isNewPeriod && (industry_name == "普货" || industry_name == "危货" || industry_name == "客运")) { //普货、危货、客运
              if (nextApprovedDate == "null") {
                //车辆基础信息表匹配不到技术评定数据
              } else if (nextApprovedDate != "" && dateTime > nextApprovedDate) {
                hiddenName = "车辆技术评定逾期"
                superviseConfigId = "22"
                superviseRuleConfigId = "22"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + "，过期时间为：" + nextApprovedDate
                keyOfRedis = "transportCertificateGrande_" + todayStr
                yesTodayKeyOfRedis = "transportCertificateGrande_" + yesterdayStr
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且车辆技术评定已在" + nextApprovedDate + "逾期"

                //transport_supervise_topic="transport_supervise_topic_test"
                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }
            }
            if (isNewPeriod && (industry_name == "普货" || industry_name == "危货" || industry_name == "客运")) { //普货、危货、客运
              if (certificate_date == "null") {
                //车辆基础信息表匹配不到企业经营许可证数据
              } else if (certificate_date != "" && dateTime > certificate_date) {
                hiddenName = "企业经营许可证过期"
                superviseConfigId = "23"
                superviseRuleConfigId = "23"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + certificate_date
                keyOfRedis = "transportCertificateCertificate_" + todayStr
                yesTodayKeyOfRedis = "transportCertificateCertificate_" + yesterdayStr
                val hiddenRecordKeyOfRedis = "enterpriseHiddenRecord:" + todayStr + ":" + carGPS.dealDeptCode + "_" + carGPS.enterpriseName + "_" + typeCode + "_" + certificate_date
                val newGenerateObject = carGPS.enterpriseName
                val newGenerateObjectCode = carGPS.socialCreditCode
                //transport_supervise_topic="transport_supervise_topic_test"

                processEnterpriseHidden(jedis, newGenerateObjectCode, carGPS.enterpriseName, carGPS, keyOfRedis, yesTodayKeyOfRedis, hiddenRecordKeyOfRedis, tripStartTime)
                //processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId,newGenerateObject,newGenerateObjectCode,transport_supervise_topic,tripStartTime)
              }
            }
            if (isNewPeriod && industry_name == "客运") { //客运
              if (insure_vehicle_id == "null" || insure_vehicle_id == "") { //从redis中获投保车辆id，如果为null，则表示未购置承运人责任险
                hiddenName = "未购置承运人责任险"
                superviseConfigId = "28"
                superviseRuleConfigId = "28"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportInsure_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportInsure_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到承运人责任险信息"

                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)

              } else if (carrier_liability_insurance_effective_date != "" && dateTime > carrier_liability_insurance_effective_date) { //营运时间大于过期时间且 过期时间不为空串（空串暂不判断）
                hiddenName = "承运人责任险逾期"
                superviseConfigId = "29"
                superviseRuleConfigId = "29"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + carrier_liability_insurance_effective_date
                keyOfRedis = "transportInsureExpire_" + todayStr
                yesTodayKeyOfRedis = "transportInsureExpire_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且承运人责任险已在" + carrier_liability_insurance_effective_date + "逾期"


                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }
            }
            if (useNature == "78") { //旅游包车 ----包车牌
              logger.info("旅游包车" + electronicFenceName)

              //电子围栏
              val electronicFenceIndex = electronicFenceMap.getOrElse(electronicFenceName, null)
              var area: List[Point2D.Double] = List.empty
              if (electronicFenceName != "null" && electronicFenceIndex != null) {
                logger.info("name：" + electronicFenceIndex.name)

                area = getArea(electronicFenceIndex.area_geom)


              }
              if (isNewPeriod && branum == "null") {
                hiddenName = "未申请包车牌"
                superviseConfigId = "15"
                superviseRuleConfigId = "15"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportBranum_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportBranum_" + yesterdayStr
                val newGenerateObjectCode = branum + "_" + bracolor + "_" + grade_charter_mark + "_" + charter_start_date + "_" + charter_end_date
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到包车牌信息"


                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, newGenerateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              } else {
                if (carGPS.lon.toDouble > 1 && carGPS.lat.toDouble > 1 && area.nonEmpty && !getVehicleIsInArea(carGPS.lon.toDouble, carGPS.lat.toDouble, area)) {
                  hiddenName = "客运包车超范围经营"
                  superviseConfigId = "31"
                  superviseRuleConfigId = "31"
                  content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                  keyOfRedis = "TransportOutOfScope_" + todayStr
                  val newGenerateObjectCode = branum + "_" + bracolor + "_" + grade_charter_mark + "_" + charter_start_date + "_" + charter_end_date
                  yesTodayKeyOfRedis = "TransportOutOfScope_" + yesterdayStr
                  //transport_supervise_topic="transport_supervise_topic_test"
                  val generate_detail = charter_range + "出" + electronicFenceIndex.name + "营运"

                  processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, newGenerateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
                }

                if (start_point_name != "null" && aim_name != "null" && alignments != "null") {
                  //包车路线周期
                  processCharterRoute(kafkaProducer, carGPS, jedis, industry, typeCode, start_point_name, alignments, aim_name, grade_charter_mark, charter_start_date, charter_end_date, is_return_passenger, hiddenRuleMap)
                }

              }
            }

            if (industry_name == "普货") { //普货  --准运证
              if (isNewPeriod && permit_license_no == "null") { //从redis中获取准运证信息，如果为'null'
                hiddenName = "无准运证"
                superviseConfigId = "10"
                superviseRuleConfigId = "10"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportPermit_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportPermit_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到准运证信息"

                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              } else {
                val permit_license_no_arr = permit_license_no.split("#")
                val pass_time_interval_arr = pass_time_interval.split("#")
                val pass_route_location_arr = pass_route_location.split("#")
                val gmt_end_arr = gmt_end.split("#")
                for (i <- 0 until permit_license_no_arr.length) {
                  if (isNewPeriod && permit_license_no_arr(i) != "null" && gmt_end_arr(i) != "" && dateTime > gmt_end_arr(i)) { //营运时间大于过期时间且 过期时间不为空串（空串暂不判断）
                    hiddenName = "准运证过期"
                    superviseConfigId = "9"
                    superviseRuleConfigId = "9"
                    content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + gmt_end_arr(i)
                    keyOfRedis = "transportPermitExpire_" + todayStr
                    yesTodayKeyOfRedis = "transportPermitExpire_" + yesterdayStr
                    val newGenerateObjectCode = permit_license_no_arr(i)
                    //transport_supervise_topic="transport_supervise_topic_test"
                    val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且准运证已在" + gmt_end_arr(i) + "过期"

                    processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, newGenerateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
                  }
                  if (permit_license_no_arr(i) != "null" && pass_time_interval_arr(i) != "null" && pass_route_location_arr(i) != "null") {
                    //准运证路线周期
                    processPermitRoute(kafkaProducer, carGPS, jedis, industry, typeCode, pass_time_interval_arr(i), pass_route_location_arr(i),permit_license_no_arr(i), hiddenRuleMap)
                  }

                }
              }
            }

            if (industry_name == "普货" || industry_name == "危货") { //普货、危货  --通行证
              if (isNewPeriod && pass_plate_num == "null") { //从redis中获取通行证信息，如果为'null'
                hiddenName = "无通行证"
                superviseConfigId = "12"
                superviseRuleConfigId = "12"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportPass_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportPass_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到通行证信息"

                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              } else {
                if (isNewPeriod && pass_plate_num != "null" && pass_gmt_end != "" && dateTime > pass_gmt_end) { //营运时间大于过期时间且 过期时间不为空串（空串暂不判断）
                  hiddenName = "通行证过期"
                  superviseConfigId = "11"
                  superviseRuleConfigId = "11"
                  content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName + ",过期时间为：" + carrier_liability_insurance_effective_date
                  keyOfRedis = "transportPassExpire_" + todayStr
                  yesTodayKeyOfRedis = "transportPassExpire_" + yesterdayStr
                  //transport_supervise_topic="transport_supervise_topic_test"
                  val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且通行证已在" + pass_gmt_end + "过期"

                  processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
                }
                if (pass_plate_num != "null" && pass_pass_time_interval != "null" && pass_pass_route_location != "null") {
                  //准运证路线周期
                  processPassRoute(kafkaProducer, carGPS, jedis, industry, typeCode, pass_pass_time_interval, pass_pass_route_location, hiddenRuleMap)
                }
              }
            }

            if (useNature == "70") { //班线客运  --班线

              logger.info("班车客运" + electronicFenceName)
              //电子围栏
              val electronicFenceIndex = electronicFenceMap.getOrElse(electronicFenceName, null)
              var area: List[Point2D.Double] = List.empty
              if (electronicFenceName != "null" && electronicFenceIndex != null) {

                area = getArea(electronicFenceIndex.area_geom)

              }
              if (isNewPeriod && tranship_plate_num == "null" && nowDateTime > "02:00" && nowDateTime < "05:00") {
                hiddenName = "未填报接驳单"
                superviseConfigId = "30"
                superviseRuleConfigId = "30"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "notHaveTransportTranship_" + todayStr
                yesTodayKeyOfRedis = "notHaveTransportTranship_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"

                val generate_detail = carGPS.vehicleNo + "于" + carGPS.dateTime + "营运且未查到接驳单信息"

                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }


              if (isNewPeriod && line_vehicle_id == "null") {
                //班线路线无记录
              } else if (carGPS.lon.toDouble > 1 && carGPS.lat.toDouble > 1 && area.nonEmpty && !getVehicleIsInArea(carGPS.lon.toDouble, carGPS.lat.toDouble, area)) {
                hiddenName = "客运包车超范围经营"
                superviseConfigId = "31"
                superviseRuleConfigId = "31"
                content = carGPS.vehicleNo + "_" + carGPS.dateTime + "_产生隐患：" + hiddenName
                keyOfRedis = "TransportOutOfScope_" + todayStr
                yesTodayKeyOfRedis = "TransportOutOfScope_" + yesterdayStr
                //transport_supervise_topic="transport_supervise_topic_test"
                val generate_detail = business_scope + "出" + electronicFenceIndex.name + "营运"


                processHidden(hiddenName, carGPS, jedis, industry, kafkaProducer, typeCode, content, keyOfRedis, yesTodayKeyOfRedis, superviseConfigId, superviseRuleConfigId, generateObject, generateObjectCode, transport_supervise_topic, tripStartTime, generate_detail, hiddenRuleMap)
              }
            }

            //超速高发
            //第一次出现超速之后，在redis中使用list存储该车辆的实时经纬度，key为：overSpeedCarGpsPoints_carGPS.vehicleNo_plateColor
            val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + carGPS.vehicleNo + "_" + carGPS.vehicleColor
            if (jedis.exists(carGpsPointsKey)) {
              jedis.rpush(carGpsPointsKey, carGPS.lon + "," + carGPS.lat)
            }
            logger.info("dateTimeStr" + dateTimeStr + "industry" + industry)

            if (dateTimeStr >= "06:00:00" && dateTimeStr <= "22:00:00") { //白天

              if (industry_name == "危货" && carGPS.vec1.toDouble > 80) { //危货
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80,hiddenRuleMap)
              } else if (industry_name == "客运" && carGPS.vec1.toDouble > 100) { //客运
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 100,hiddenRuleMap)
              } else if (industry_name == "普货" && carGPS.vec1.toDouble > 100) { //普货
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 100,hiddenRuleMap)
              } else if (industry_name == "出租车" && carGPS.vec1.toDouble > 120) { //出租车
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 120,hiddenRuleMap)
              }

            } else { //晚上

              if (industry_name == "危货" && carGPS.vec1.toDouble > 80) { //危货
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80,hiddenRuleMap)
              } else if (industry_name == "客运" && carGPS.vec1.toDouble > 80) { //客运
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80,hiddenRuleMap)
              } else if (industry_name == "普货" && carGPS.vec1.toDouble > 80) { //普货
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80,hiddenRuleMap)
              } else if (industry_name == "出租车" && carGPS.vec1.toDouble > 100) { //出租车
                generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 100,hiddenRuleMap)
              }
            }


          }
        }
      }
    }
  }



  /**
   *
   * @param jedis
   * @param newGenerateObjectCode
   * @param enterpriseName
   * @param carGPS
   * @param keyOfRedis
   * @param yesTodayKeyOfRedis
   * @param hiddenRecordKeyOfRedis
   * @param tripStartTime
   * @return
   */
  def processEnterpriseHidden(jedis: Jedis, newGenerateObjectCode: String,enterpriseName:String,carGPS: CarGPS, keyOfRedis: String, yesTodayKeyOfRedis: String,hiddenRecordKeyOfRedis:String,tripStartTime:String) = {
    jedis.select(9)

    val todayStr: String = new DateTime().toString("yyyy-MM-dd")
    if (jedis.hget(keyOfRedis, carGPS.vehicleNo + "_" + carGPS.vehicleColor) == "1") {
      //break()
    } else { //redis没有存该车的过期记录
      jedis.hset(hiddenRecordKeyOfRedis,carGPS.vehicleNo+"_"+carGPS.vehicleColor,tripStartTime)
      jedis.hset(keyOfRedis, carGPS.vehicleNo + "_" + carGPS.vehicleColor, "1")

    }
    val yesterdayStr: String = DateTime.parse(todayStr, DateTimeFormat.forPattern("yyyy-MM-dd")).plusDays(-1).toString("yyyy-MM-dd")
    //删除昨天的redis数据
    if (jedis.exists(yesTodayKeyOfRedis)) {
      jedis.del(yesTodayKeyOfRedis)
    }
  }


  /*  /** 处理连续超速
     *
     * @param carGPSIter
     * @param jedis
     * @param kafkaProducer
     * @param continousTimes 连续超速的次数
     * @param industry       所属行业
     */
    def processOverSpeedContinous(carGPSIter: Iterable[CarGPS], jedis: Jedis, kafkaProducer: KafkaProducer[String, String], continousTimes: Int, industry: String, typeCode: String,industry_name:String): Unit = {

      try {
        if (null != carGPSIter && carGPSIter.size > 0) {
          val sortedCarGPSArr: Array[CarGPS] = carGPSIter.toArray
            .filter { data => data.vec1.toDouble < 200 }
            .sortBy(_.dateTime)

          if (sortedCarGPSArr.size > 0) {
            println("进入超速隐患判断")

            //                println("=======根据datetime排序后的gps=======1")
            //                sortedCarGPSArr.foreach(println(_)) //打印超速gps
            //                println("=======根据datetime排序后的gps=======2")
            for (carGPS <- sortedCarGPSArr) {
              val dateTimeStr: String = DateTime.parse(carGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("HH:mm:ss")

              //第一次出现超速之后，在redis中使用list存储该车辆的实时经纬度，key为：overSpeedCarGpsPoints_carGPS.vehicleNo_plateColor
              val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + carGPS.vehicleNo + "_" + carGPS.vehicleColor
              if (jedis.exists(carGpsPointsKey)) {
                jedis.rpush(carGpsPointsKey, carGPS.lon + "," + carGPS.lat)
              }
              println("dateTimeStr"+dateTimeStr+"industry"+industry)

              if (dateTimeStr >= "06:00:00" && dateTimeStr <= "22:00:00") { //白天

                if (industry_name == "危货" && carGPS.vec1.toDouble > 80) {  //危货
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80)
                } else if (industry_name == "客运" && carGPS.vec1.toDouble > 100) {  //客运
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 100)
                } else if (industry_name == "普货" && carGPS.vec1.toDouble > 100) {  //普货
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 100)
                } else if (industry_name == "出租车" && carGPS.vec1.toDouble > 120) {  //出租车
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 120)
                }

              } else { //晚上

                if (industry_name == "危货" && carGPS.vec1.toDouble > 60) {  //危货
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 60)
                } else if (industry_name == "客运" && carGPS.vec1.toDouble > 80) { //客运
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80)
                } else if (industry_name == "普货" && carGPS.vec1.toDouble > 80) { //普货
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 80)
                } else if (industry_name == "出租车" && carGPS.vec1.toDouble > 110) {  //出租车
                  generateHiddenPeril(jedis, carGPS, continousTimes, kafkaProducer, industry, typeCode, 110)
                }

              }
            }
          }
        }
      } catch {
        case e:Exception => println("处理超速失败！")
      }
    }*/


  /**
   * 生成隐患 :超速高发（一路狂奔） 、处理超速（屡教不改）
   *
   * @param jedis
   * @param speedGPS
   * @param continousTimes
   * @param kafkaProducer
   * @param industry
   */
  def generateHiddenPeril(jedis: Jedis, speedGPS: CarGPS, continousTimes: Int, kafkaProducer: KafkaProducer[String, String], industry: String, typeCode: String, limitSpeed: Double,hiddenRuleMap: Map[String, Boolean]): Unit = {
    logger.info("超速高发判断")
    jedis.select(9)
    val overSpeedKey: String = "sortedOverSpeed_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor //生成redis的key值，redis 的保存类型为list
    if (!jedis.exists(overSpeedKey)) { //如果不存在该key,则创建key,保存时间区间
      logger.info("overSpeedKey不存在")
      val value = speedGPS.dateTime + "_" + DateTime.parse(speedGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).plusSeconds(75).toString("yyyy-MM-dd HH:mm:ss")
      jedis.lpush(overSpeedKey, value)
      jedis.expire(overSpeedKey, 60 * 20) //设置过期时间20分钟
      // 第一次超速之后，在redis中使用list存储该车的实时经纬度
      val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
      jedis.rpush(carGpsPointsKey, speedGPS.lon + "," + speedGPS.lat)
      jedis.expire(carGpsPointsKey, 60 * 20) //过期时间为20分钟

      //第一次超速之后，存储超速点的 speed 和 dateTime。key为：overSpeedListKey_speedGPS.vehicleNo_speedGPS.vehicleColor
      val overSpeedListKey: String = "overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
      jedis.rpush(overSpeedListKey, speedGPS.vec1 + "," + speedGPS.dateTime)
      jedis.expire(overSpeedListKey, 60 * 20) //过期时间为20分钟

    } else { //如果存在该key，保存时间区间
      jedis.select(9)
      val dateStrArr: Array[String] = jedis.lindex(overSpeedKey, 0).split("_")
      if (null != dateStrArr && dateStrArr.size > 0) {
        val startTime = DateTime.parse(dateStrArr(0), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).plusSeconds(75).toString("yyyy-MM-dd HH:mm:ss")
        val endTime = DateTime.parse(dateStrArr(1), DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).plusSeconds(75).toString("yyyy-MM-dd HH:mm:ss")
        // println("==========已经存在了这辆车的超速gps==============1")
        // println(startTime + "_" + endTime + "===该gps点的速度为：" + speedGPS.vec1 + "===该gps点的dateTime为：" + speedGPS.dateTime)
        // println("==========已经存在了这辆车的超速gps==============2")

        if (speedGPS.dateTime >= startTime && speedGPS.dateTime < endTime) {

          jedis.rpush("overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor, speedGPS.vec1 + "," + speedGPS.dateTime)

          // val overSpeedList: util.List[String] = jedis.lrange("overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor, 0, -1)
          // println("========= 超速时的速度和时间："+overSpeedList.toString)
          // println("=======================")
          val length: lang.Long = jedis.lpush(overSpeedKey, startTime + "_" + endTime) //插入redis之后返回list的长度
          logger.info("判断隐患生成的list长度为5时：" + length)
          if (length == continousTimes&&hiddenRuleMap.getOrElse("7",false)) {
            logger.info("进入隐患处理！！！！！！！！！！！")
            val primaryKey = speedGPS.vehicleNo + "#" + speedGPS.vehicleColor
            val drivingPeriodStr: String = jedis.hget("hiddenDrivingPeriod", primaryKey)
            val drivingPeriod = JSON.parseObject(drivingPeriodStr, classOf[DrivingPeriod])
            val tripStartTime: String = drivingPeriod.ts


            val addr: (String, String, String, String) = GPSUtil.getAddressAndRoad(speedGPS.lon.toDouble, speedGPS.lat.toDouble)
            val inner_area=if (addr._1.contains("重庆市")) 1 else 0

            val jSONObject = new JSONObject()
            jSONObject.put("orderNumber", new IdWorker().nextId().toString)
            jSONObject.put("dealStatus", "1")
            jSONObject.put("generatedTime", speedGPS.dateTime)
            jSONObject.put("content", speedGPS.vehicleNo + "_" + speedGPS.dateTime + "_产生隐患：超速高发（一路狂奔）,当前速度为：" + speedGPS.vec1 + " km/h")
            jSONObject.put("superviseRuleConfigId", "7")
            jSONObject.put("industry", industry) //所属行业
            jSONObject.put("superviseConfigId", "7")
            jSONObject.put("businessId", "1")
            jSONObject.put("generateObject", speedGPS.vehicleNo)
            jSONObject.put("generate_object_code", speedGPS.vehicleNo)
            jSONObject.put("longitude", speedGPS.lon)
            jSONObject.put("latitude", speedGPS.lat)
            jSONObject.put("addressName", addr._1 + "_" + addr._4)
            jSONObject.put("inner_area", inner_area)
            jSONObject.put("dealDept", speedGPS.enterpriseName) //超速隐患该字段填企业名称
            jSONObject.put("dealDeptCode", speedGPS.dealDeptCode)
            jSONObject.put("deptCode", speedGPS.dealDeptCode)
            jSONObject.put("typeCode", typeCode)
            jSONObject.put("vehicleNo", speedGPS.vehicleNo)
            jSONObject.put("vehicleColor",if(speedGPS.vehicleColor=="蓝色") "1"
            else if(speedGPS.vehicleColor=="黄色") "2"
            else if(speedGPS.vehicleColor=="黑色") "3"
            else if(speedGPS.vehicleColor=="白色") "4"
            else if(speedGPS.vehicleColor=="绿色") "5"
            else "9")
            jSONObject.put("driverStartTime", tripStartTime)
            jSONObject.put("generate_detail", "车辆共超速"+continousTimes+"次，车速均大于"+limitSpeed+"km/h")
            //超速高发（一路狂奔）发送kafka
            kafkaProducer.send(new ProducerRecord[String, String]("transport_supervise_topic_ys", jSONObject.toJSONString))

            //超速隐患证据存入es中的：over_speed_trajectory_index/over_speed_trajectory_type
            overSpeedevidence2ES(jedis, "over_speed_trajectory_index", speedGPS, limitSpeed)

            //处理超速（屡教不改）
            alwaysOverSpeed(jedis, speedGPS, kafkaProducer, industry, typeCode,hiddenRuleMap)


            //连续超速算多次隐患，如10次连续时间间隔超速算两次隐患,清空redis包括本次一共5次超速记录
            jedis.del(overSpeedKey)

            val value = speedGPS.dateTime + "_" + DateTime.parse(speedGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).plusSeconds(60).toString("yyyy-MM-dd HH:mm:ss")
            jedis.expire(overSpeedKey, 60 * 20) //重新设置过期时间20分钟

            val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
            jedis.del(carGpsPointsKey) //重新设置该车的实时经纬度
            jedis.expire(carGpsPointsKey, 60 * 20)

            val overSpeedListKey: String = "overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
            jedis.del(overSpeedListKey)//重新设置存储超速点的 speed 和 dateTime
            jedis.expire(overSpeedListKey, 60 * 20)
          }
        }else if (speedGPS.dateTime<startTime){


        } else {
          logger.info("当前超速时间不在最后超速时间的连续间隔75s时间区间内,清空之前未达到3次连续超速的数据，并将本次超速重新作为第一次超速")
          //当前超速时间不在最后超速时间的连续间隔60s时间区间内,清空之前未达到5次连续超速的数据，并将本次超速重新作为第一次超速
          jedis.del(overSpeedKey)
          val value = speedGPS.dateTime + "_" + DateTime.parse(speedGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).plusSeconds(60).toString("yyyy-MM-dd HH:mm:ss")
          jedis.lpush(overSpeedKey, value)
          jedis.expire(overSpeedKey, 60 * 20) //重新设置过期时间20分钟

          val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
          jedis.del(carGpsPointsKey) //重新设置该车的实时经纬度
          jedis.rpush(carGpsPointsKey, speedGPS.lon + "," + speedGPS.lat)
          jedis.expire(carGpsPointsKey, 60 * 20)

          val overSpeedListKey: String = "overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
          jedis.del(overSpeedListKey)//重新设置存储超速点的 speed 和 dateTime
          jedis.rpush(overSpeedListKey, speedGPS.vec1 + "," + speedGPS.dateTime)
          jedis.expire(overSpeedListKey, 60 * 20)


        }
      }
    }
  }

  /**
   * 将超速证据插入es：over_speed_trajectory_index
   *
   * @param jedis
   * @param index
   * @param speedGPS
   */
  def overSpeedevidence2ES(jedis: Jedis, index: String, speedGPS: CarGPS, limitSpeed: Double): Unit = {
    jedis.select(9)

    val jestClient: JestClient = EsUtils.getClient()
    val typeStr: String = index.replace("index", "type")
    //list存储该车的实时经纬度
    import scala.collection.JavaConverters._
    val carGpsPointsKey: String = "overSpeedCarGpsPoints_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
    val lonAndLatList: util.List[String] = jedis.lrange(carGpsPointsKey, 0, -1)
    val lonAndLatsStr: String = lonAndLatList.asScala.filter(str => null != str).map { str =>
      val strings: Array[String] = str.split(",")
      "[" + strings(0) + "," + strings(1) + "]"
    }.toList.mkString("[", ",", "]")

    //超速点的 speed 和 dateTime。
    val overSpeedListKey: String = "overSpeedListKey_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor
    val overSpeedList: util.List[String] = jedis.lrange(overSpeedListKey, 0, -1)
    val speedAndTimeArr: Array[OverSpeed] = overSpeedList.asScala.map { str =>
      val strings: Array[String] = str.split(",")
      OverSpeed(strings(0), strings(1))
    }.toArray

    val dateStr: String = DateTime.parse(speedGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")

    val evidence: Evidence = Evidence(speedGPS.vehicleNo,
      speedGPS.vehicleColor,
      Array(Trajectory(lonAndLatsStr, dateStr)),
      dateStr + s" 车辆5分钟内共超速${jedis.llen(overSpeedListKey)}次车速均大于${limitSpeed} km/h",
      speedAndTimeArr,
      limitSpeed,
      speedGPS.dateTime,
      new DateTime().toString("yyyy-MM-dd HH:mm:ss")
    )
    //将超速隐患证据写入es
    EsUtils.insertIntoEs(jestClient, index, typeStr, evidence)
  }

  /**
   * 处理超速（屡教不改）
   *
   * @param jedis
   * @param speedGPS
   * @param kafkaProducer
   * @param industry
   */
  def alwaysOverSpeed(jedis: Jedis, speedGPS: CarGPS, kafkaProducer: KafkaProducer[String, String], industry: String, typeCode: String,hiddenRuleMap: Map[String, Boolean]): Unit = {
    jedis.select(9)

    val dateTime: String = DateTime.parse(speedGPS.dateTime, DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")).toString("yyyy-MM-dd")
    //与redis中的历史数据比对，并存入redis中保存30天
    val key = "overSpeed30DaysHistory_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor + "_" + dateTime
    if (!jedis.exists(key)) {
      jedis.setex(key, 30 * 24 * 60 * 60, "1")
    } else {
      val keys: util.Set[String] = jedis.keys("overSpeed30DaysHistory_" + speedGPS.vehicleNo + "_" + speedGPS.vehicleColor + "*")
      import scala.collection.JavaConverters._
      val filteredKeys: mutable.Set[String] = keys.asScala.filter(x => x != key)

      if (filteredKeys.nonEmpty&&hiddenRuleMap.getOrElse("8",false)) {
        val addr: (String, String, String, String) = GPSUtil.getAddressAndRoad(speedGPS.lon.toDouble, speedGPS.lat.toDouble)
        val inner_area=if (addr._1.contains("重庆市")) 1 else 0

        //超速高发（屡教不改）
        val jSONObject = new JSONObject()
        jSONObject.put("orderNumber", new IdWorker().nextId().toString)
        jSONObject.put("dealStatus", "1")
        jSONObject.put("generatedTime", speedGPS.dateTime)
        jSONObject.put("content", speedGPS.vehicleNo + "_" + speedGPS.dateTime + "_产生隐患：超速高发（屡教不改）,当前速度为：" + speedGPS.vec1 + " km/h")
        jSONObject.put("superviseRuleConfigId", "8")
        jSONObject.put("industry", industry) //所属行业
        jSONObject.put("superviseConfigId", "8")
        jSONObject.put("businessId", "1")
        jSONObject.put("generateObject", speedGPS.vehicleNo)
        jSONObject.put("generate_object_code", speedGPS.vehicleNo)
        jSONObject.put("longitude", speedGPS.lon)
        jSONObject.put("latitude", speedGPS.lat)
        jSONObject.put("addressName", addr._1 + "_" + addr._4)
        jSONObject.put("inner_area", inner_area)
        jSONObject.put("dealDept", speedGPS.enterpriseName) //超速隐患该字段填企业名称
        jSONObject.put("dealDeptCode", speedGPS.dealDeptCode)
        jSONObject.put("deptCode", speedGPS.dealDeptCode)
        jSONObject.put("typeCode", typeCode)
        jSONObject.put("vehicleNo", speedGPS.vehicleNo)
        jSONObject.put("vehicleColor",if(speedGPS.vehicleColor=="蓝色") "1"
        else if(speedGPS.vehicleColor=="黄色") "2"
        else if(speedGPS.vehicleColor=="黑色") "3"
        else if(speedGPS.vehicleColor=="白色") "4"
        else if(speedGPS.vehicleColor=="绿色") "5"
        else "9")
        //发送kafka
        kafkaProducer.send(new ProducerRecord[String, String]("transport_supervise_topic_ys", jSONObject.toJSONString))
      }
    }
  }
}

case class OverSpeed(var speed: String, var time: String)

case class Trajectory(var trajectory: String, var day: String)

//超速隐患证据
case class Evidence(
                     var vehicleNo: String,
                     var vehicleColor: String,
                     var trajectory: Array[Trajectory],
                     var overSpeedDesc: String,
                     var overSpeed: Array[OverSpeed],
                     var speedLimit: Double,
                     var generateTime: String,
                     var createTime: String)

case class ElectronicFenceIndex(@BeanProperty var adcode: String,
                                @BeanProperty var area_geom: Array[String],
                                @BeanProperty var center_point: String,
                                @BeanProperty var is_del: Long,
                                @BeanProperty var is_use: Long,
                                @BeanProperty var name: String,
                                @BeanProperty var parent_adcode: String)
